from otree.api import * from decimal import * author = "Nathaniel Archer Lawrence, LEMMA, Université Panthéon-Assas - Paris II" doc = """ Consumption simulation with interface based off of 'Shopping app (online grocery store)' from oTree demos, see: . """ def read_csv(): import csv f = open(__name__ + '/catalog.csv', encoding='utf-8-sig') rows = [row for row in csv.DictReader(f)] for row in rows: # all values in CSV are string unless you convert them row['unit_price'] = cu(row['unit_price']) return rows def read_csv_inflation(): import csv f = open(__name__ + '/animal_spirits_120.csv', encoding='utf-8-sig') rows = [row for row in csv.DictReader(f)] for row in rows: # all values in CSV are string unless you convert them row['rate'] = float(row['rate']) / 12 # convert from the annualized rate row['period'] = int(row['period']) return rows class C(BaseConstants): NAME_IN_URL = 'shop120_annualized' PLAYERS_PER_GROUP = None NUM_ROUNDS = 120 # consumption constants INITIAL_ENDOWMENT = 120 INCOME = 3.99 INTEREST_RATE = 0.1/12 CONSUMPTION_RATE = 1 # amount of good consumed from stock balance each period MONETARY_POLICY = 0 # inflation parameters from csv INFLATION = read_csv_inflation() # period = ID in dict INFLATION_DICT = {row['period']: row for row in INFLATION} # list of products taken from csv file PRODUCTS = read_csv() # SKU = 'stock keeping unit' = product ID PRODUCTS_DICT = {row['sku']: row for row in PRODUCTS} class Subsession(BaseSubsession): def creating_session(self): if self.player.round_number > 1: print("PRODUCTS_DICT[Unit price] * 2: ", (C.PRODUCTS_DICT['1']['unit_price'] * 2)) else: print("PRODUCTS_DICT[Unit price]: ", (C.PRODUCTS_DICT['1']['unit_price'])) class Group(BaseGroup): pass class Player(BasePlayer): total_price = models.CurrencyField(initial=0) initial_savings = models.CurrencyField(initial=0) cashOnHand = models.CurrencyField(initial=0) finalSavings = models.CurrencyField(initial=0) finalStock = models.IntegerField(initial=0) interestEarned = models.CurrencyField(initial=0) decision = models.LongStringField(initial='') newPrice = models.FloatField(initial=C.PRODUCTS_DICT['1']['unit_price']) # Response time responseTime = models.FloatField(initial=0) # Choice confidence current_choiceConfidence = models.IntegerField( choices = [[1,'1 – Not at all confident to have made the right decision'],[2,'2'],[3,'3'],[4,'4'],[5,'5 – Absolutely confident to have made the right decision']], label='How confident are you that you made the right decision this period?', widget=widgets.RadioSelect ) # previous_choiceConfidence = models.IntegerField( # choices = [[1,'1 – Not at all confident to have made the right decision'],[2,'2'],[3,'3'],[4,'4'],[5,'5 – Absolutely confident to have made the right decision']], # label='How confident are you with your decision in the previous period?', # widget=widgets.RadioSelect # ) # Inflation estimates inf = models.FloatField() class Item(ExtraModel): player = models.Link(Player) sku = models.StringField() name = models.StringField() quantity = models.IntegerField() unit_price = models.CurrencyField() newPrice = models.FloatField() # Calculate total price of item with quantity selected def total_price(item: Item): return item.quantity * item.unit_price # Convert information about items into a dictionary def to_dict(item: Item): return dict( sku=item.sku, name=item.name, quantity=item.quantity, total_price=total_price(item) ) # Send info to HTML def live_method(player: Player, data): import json # to convert dict to json print("Round number: ", player.round_number) print("player new price csv: ", player.newPrice) if player.round_number == 1: player.newPrice = float(C.PRODUCTS_DICT['1']['unit_price'] * (1+C.INFLATION_DICT[player.round_number]['rate'])) if 'sku' in data: sku = data['sku'] delta = data['delta'] product = C.PRODUCTS_DICT[sku] matches = Item.filter(player=player, sku=sku) if matches: [item] = matches item.quantity += delta if item.quantity <= 0: item.delete() else: if delta > 0: Item.create( player=player, quantity=delta, sku=sku, name=product['name'], unit_price=player.newPrice ) items = Item.filter(player=player) item_dicts = [to_dict(item) for item in items] player.total_price = sum([total_price(item) for item in items]) player.initial_savings = C.INITIAL_ENDOWMENT player.cashOnHand = player.initial_savings + C.INCOME player.finalSavings = player.cashOnHand - player.total_price player.finalStock = sum([item.quantity for item in items]) - C.CONSUMPTION_RATE player.interestEarned = 0 player.newPrice = player.newPrice else: player.newPrice = player.in_round(player.round_number - 1).newPrice * (1+C.INFLATION_DICT[player.round_number]['rate']) # allows subject to add and remove from shopping cart if 'sku' in data: sku = data['sku'] delta = data['delta'] product = C.PRODUCTS_DICT[sku] matches = Item.filter(player=player, sku=sku) if matches: [item] = matches item.quantity += delta if item.quantity <= 0: item.delete() else: if delta > 0: Item.create( player=player, quantity=delta, sku=sku, name=product['name'], unit_price=player.newPrice ) items = Item.filter(player=player) item_dicts = [to_dict(item) for item in items] player.total_price = sum([total_price(item) for item in items]) player.initial_savings = player.in_round(player.round_number - 1).finalSavings * (1 + (C.INTEREST_RATE + (C.MONETARY_POLICY * C.INFLATION_DICT[player.round_number - 1]['rate']))) # fetch previous period's final savings print("inflation rate: ", C.INFLATION_DICT[player.round_number]['rate']) player.cashOnHand = player.initial_savings + C.INCOME player.finalSavings = player.cashOnHand - player.total_price player.finalStock = player.in_round(player.round_number - 1).finalStock + sum([item.quantity for item in items]) - C.CONSUMPTION_RATE # fetch previous period's final stock player.interestEarned = player.in_round(player.round_number - 1).finalSavings * (C.INTEREST_RATE + (C.MONETARY_POLICY * C.INFLATION_DICT[player.round_number - 1]['rate'])) player.decision = json.dumps({"item":[item.sku for item in items],"quantity":[item.quantity for item in items],"price":[float(item.unit_price) for item in items]}) print('Current player decision: ', player.decision) print('Player new price: ', player.newPrice) return { player.id_in_group: dict( items=item_dicts, total_price=player.total_price, initial_savings=player.initial_savings, interestEarned=player.interestEarned, cashOnHand=player.cashOnHand, finalSavings=player.finalSavings, finalStock=player.finalStock, decision=player.decision, newPrice=player.newPrice ) } # PAGES class MyPage(Page): live_method = live_method form_model = 'player' form_fields = ['responseTime'] @staticmethod def error_message(player, value): if player.finalSavings < 0: return 'Your do not have sufficient Total Cash to complete this purchase.' # ### Choice confidence quesiton for period 1 should only ask about current decision # class Results_1(Page): # form_model = 'player' # form_fields = ['current_choiceConfidence'] # @staticmethod # def vars_for_template(player: Player): # return dict(items=Item.filter(player=player)) # @staticmethod # def is_displayed(player): # return player.round_number == 1 ### Choice confidence quesitons about current decision class Results(Page): form_model = 'player' form_fields = ['current_choiceConfidence'] @staticmethod def vars_for_template(player: Player): return dict(items=Item.filter(player=player)) # @staticmethod # def is_displayed(player): # return player.round_number > 1 @staticmethod def app_after_this_page(player, upcoming_apps): if player.round_number == 120: player.participant.periods_survived=player.round_number print('periods survived: ',player.participant.periods_survived) return 'go_no_go_2' class PriceChanges(Page): form_model = 'player' form_fields = ['inf'] @staticmethod def is_displayed(player): return player.round_number % 12 == 0 @staticmethod def vars_for_template(player: Player): return dict(rounds12Before=player.round_number - 11) class Failed(Page): @staticmethod def is_displayed(player: Player): return player.in_round(player.round_number).finalStock < 0 or player.in_round(player.round_number).finalSavings < 0 @staticmethod def app_after_this_page(player, upcoming_apps): if player.in_round(player.round_number).finalStock < 0 or player.in_round(player.round_number).finalSavings < 0: player.participant.periods_survived=player.round_number return 'go_no_go_2' @staticmethod def before_next_page(player: Player, timeout_happened): participant = player.participant participant.periods_survived = player.round_number print('periods survived recorded: ', participant.periods_survived) page_sequence = [MyPage, Results, PriceChanges, Failed]